查看原文
其他

Python 中 Redis 库分布式锁简单分析

Python猫 2022-04-12

The following article is from 懒编程 Author ayuliao

 △点击上方“Python猫”关注 ,回复“1”领取电子书

作者:ayuliao

来源:懒编程

简介

我们常会遇到某段逻辑在相同时间段内只希望被单个实例执行,而在微服务架构中,一个程序可能会存在多个实例,此时就需要通过分布式锁来实现串行执行。

最简单的分布式锁无非就是找到对于多个程序实例而言单一的存在,比如MySQL数据只有一个或Redis只有一个,此时都可以利用这单一的存在构建一个锁,多个程序实例要执行某段逻辑前必须先获得这个锁,然后才能执行。

因为某些原因,上班的时候我和同事一起研究了一下Python redis库中分布式锁的实现源码,这里简单分享一下。

通过pip可以安装这个库。

pip install redis==2.10.6

这里以这个库的2.10.6版本为例,对它Redis分布式锁源码进行简单的分析。

代码分析

实例化StrictRedis对象后,使用其中的lock方法便可获得一个分布式锁。

首先看一下lock方法对应的源码。

def lock(self, name, timeout=None, sleep=0.1, blocking_timeout=None,
             lock_class=None, thread_local=True):

        if lock_class is None:
            if self._use_lua_lock is None:
                # the first time .lock() is called, determine if we can use
                # Lua by attempting to register the necessary scripts
                try:
                    LuaLock.register_scripts(self)
                    self._use_lua_lock = True
                except ResponseError:
                    self._use_lua_lock = False
            lock_class = self._use_lua_lock and LuaLock or Lock
        return lock_class(self, name, timeout=timeout, sleep=sleep,
                          blocking_timeout=blocking_timeout,
                          thread_local=thread_local)

该方法提供了多个参数,其中:

  • name用于指定锁名
  • timeout用于指定锁的超时时间
  • sleep用于指定线程睡眠时间,线程争夺锁的过程本质就是一个循环,每过sleep秒,就会尝试去获取锁对象
  • blocking_timeout用于指定阻塞超时时间,当多个实例争夺锁时,这个时间就是实例等待锁的最长时间
  • lock_class表示使用锁的类对象
  • thread_local表示是否线程安全

方法中最关键的一句代码为lock_class = self._use_lua_lock and LuaLock or Lock,确定了lock_class后,便实例化该lock_class即可。

lock_class可以为LuaLock也可为Lock,经过简单分析,Lock类才是关键,LuaLock类继承自Lock,通过Lua代码实现Redis的一些操作,这里着重看Lock类。

首先看到该类的__init__方法。

class Lock(object):
    def __init__(self, redis, name, timeout=None, sleep=0.1,
                 blocking=True, blocking_timeout=None, thread_local=True):

        self.redis = redis
        self.name = name
        self.timeout = timeout
        self.sleep = sleep
        self.blocking = blocking
        self.blocking_timeout = blocking_timeout
        self.thread_local = bool(thread_local)
        self.local = threading.local() if self.thread_local else dummy()
        self.local.token = None
        if self.timeout and self.sleep > self.timeout:
            raise LockError("'sleep' must be less than 'timeout'")

__init__方法初始化不同的属性,其中self.local为线程的本地字段,用于存储该线程特有的数据,不与其他线程进行共享。

此外,在__init__方法中对timeout与sleep进行的判断,如果线程等待锁时的睡眠时间大于锁的超时时间,则直接返回错误。

接着重点看Lock类中的acquire方法,该方法代码如下。

import time as mod_time

class Lock(object):

    def acquire(self, blocking=None, blocking_timeout=None):
        sleep = self.sleep
        token = b(uuid.uuid1().hex)
        if blocking is None:
            blocking = self.blocking
        if blocking_timeout is None:
            blocking_timeout = self.blocking_timeout
        stop_trying_at = None
        if blocking_timeout is not None:
            stop_trying_at = mod_time.time() + blocking_timeout
        while 1:
            if self.do_acquire(token):
                self.local.token = token
                return True
            if not blocking:
                return False
            if stop_trying_at is not None and mod_time.time() > stop_trying_at:
                return False
            mod_time.sleep(sleep)

acquire方法的主逻辑就是一个死循环,在死循环中调用do_acquire方法获取Redis分布式锁,如果成功获得锁,则将token存储到当前线程的local对象中,如果没有获得,则判断blocking,如果blocking为Flase,则不再阻塞,直接返回结果,反之,则判断当前时间是否超过blocking_timeout,超过,同样返回False,反之,通过sleep方法让当前线程睡眠sleep秒。

进一步分析do_acquire方法,代码如下:

    def do_acquire(self, token):
        if self.redis.setnx(self.name, token):
            if self.timeout:
                # convert to milliseconds
                timeout = int(self.timeout * 1000# 转成毫秒
                self.redis.pexpire(self.name, timeout)
            return True
        return False

do_acquire方法中,一开始通过redis的setnx方法将name对着作为key,token作为value,setnx方法只有在key不存的情况下,才能正常的将value存入Redis中,若key依存,该方法不做任何操作,此时就相当于没有获取到锁。

将token成功插入后,则判断有无超时时间,如果设置了timeout,则通过pexpire方法将redis中name这个key的超时设置一下,因为pexpire方法是以毫秒为单位的,所以需要先将timeout转换成毫秒单位。

如果没有设置timeout,那么name这个key只能通过do_release方法中的逻辑清除。

至此,我们清楚的知道了,Redis分布式锁的本质其实就是Redis中的一个key-value,非常简单...

理清锁的获取逻辑后,来看一下相应的释放逻辑,主要关注release方法,该方法代码如下。

    def release(self):
        "Releases the already acquired lock"
        expected_token = self.local.token
        if expected_token is None:
            raise LockError("Cannot release an unlocked lock")
        self.local.token = None
        self.do_release(expected_token)

release方法中先将线程中的token取出,并将其置为None,然后调用do_release方法实现锁的释放,do_release方法代码如下。

    def do_release(self, expected_token):
        name = self.name

        def execute_release(pipe):
            lock_value = pipe.get(name)
            if lock_value != expected_token:
                raise LockError("Cannot release a lock that's no longer owned")
            pipe.delete(name)

        self.redis.transaction(execute_release, name)

do_release方法的逻辑非常简单,其主要逻辑在execute_release方法,通过Redis的transaction方法开启一个事务来执行execute_release方法中逻辑。

在execute_release中,首先通过get方法获取name这个key对应的value,获得后,通过delete方法将其删除,实现Redis分布式锁的释放。

blocking属性

观察到acquire方法的这段代码。

        while 1:
            if self.do_acquire(token):
                self.local.token = token
                return True
            if not blocking:
                return False
            if stop_trying_at is not None and mod_time.time() > stop_trying_at:
                return False
            mod_time.sleep(sleep)

如果blocking为True,获取不到锁,则执行后面的逻辑,让线程睡眠,阻塞等待其他线程将锁释放;如果blocking为False,获取不到锁,则直接返回获取锁失败。

这就会引出几种情况,假设现在有线程A与线程B都需要执行相同的逻辑,执行前需要获取锁。

如果线程A在执行的过程中,线程B也要执行了,如果blocking为True,此时线程B会被阻塞,等待线程A是否Redis锁;如果blocking为False,线程B此时获取不到锁,不执行相同的逻辑。

如果线程A执行完了,此时线程B到来,如果blocking为True或False,此时线程B都不会被阻塞并成功拿到锁,执行相同的逻辑。

一个简单的结论是,blocking无法保证逻辑是否被单次执行,如果希望通过Redis分布式锁让逻辑只执行一次,依旧需要从业务层面做控制,比如MySQL中的业务数据是否被修改或Redis中是否记录这业务数据等。

结尾

现在很多业务都离不开Redis,它已经成为互联网中的基础设施了,Redis有很多有趣的内容可以跟大家分享。

前段时间看见Redis之父退居二线,说已经为Redis工作了10年了,每天都要revice、merge他人的代码,这种工作让他没有创造东西的快乐,所以决定退居二线,将Redis交由社区运营,这让我有些感慨,软件工程是创造性的工作,适当的放空、阅读与行业无关的书籍其实有助于激发创造力。

最后感谢你的阅读,我们下篇文章见。

Python猫技术交流群开放啦!群里既有国内一二线大厂在职员工,也有国内外高校在读学生,既有十多年码龄的编程老鸟,也有中小学刚刚入门的新人,学习氛围良好!想入群的同学,请在公号内回复『交流群』,获取猫哥的微信(谢绝广告党,非诚勿扰!)~

近期热门文章推荐:

耗时两年,我终于出了一本电子书!

架构篇:什么才是真正的架构设计?

为什么 Python 多线程无法利用多核?

Python 任务自动化工具 tox 教程

感谢创作者的好文

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存